Hudi SQL DDL

适用于版本0.10.1。

以下为Spark SQL操作。

1 Spark建表

只有SparkSQL需要显式建表。使用自定义程序时自动创建不存在的表。

(1) 选项

Parameter Name Description (Optional/Required) : Default Value
primaryKey The primary key names of the table, multiple fields separated by commas. (Optional) : id
type The type of table to create (read more). cow = COPY-ON-WRITE, mor = MERGE-ON-READ. (Optional) : cow
preCombineField The Pre-Combine field of the table. (Optional) : ts

(2) 表类型

1
2
3
4
5
6
7
8
9
-- create a non-primary key table
create table if not exists hudi_table2(
id int,
name string,
price double
) using hudi
options (
type = 'cow'
);

(3) 主键

1
2
3
4
5
6
7
8
9
10
-- create a managed cow table
create table if not exists hudi_table0 (
id int,
name string,
price double
) using hudi
options (
type = 'cow',
primaryKey = 'id'
);

(4) 预聚合字段

1
2
3
4
5
6
7
8
9
10
11
12
-- create an external mor table
create table if not exists hudi_table1 (
id int,
name string,
price double,
ts bigint
) using hudi
options (
type = 'mor',
primaryKey = 'id,name',
preCombineField = 'ts'
);

(5) 分区表

1
2
3
4
5
6
7
8
9
10
11
12
create table if not exists hudi_table_p0 (
id bigint,
name string,
dt string
hh string
) using hudi
options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh);

(6) 创建外部表

使用location指定路径,否则认为是managed table,两者的区别详见https://sparkbyexamples.com/apache-hive/difference-between-hive-internal-tables-and-external-tables/。

1
2
3
4
5
6
7
create table h_p1 using hudi 
options (
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt)
location '/path/to/hudi';

(7) CTAS

CTAS采用bulk_insert写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 非分区cow表
create table h3 using hudi
as
select 1 as id, 'a1' as name, 10 as price;

-- 分区主键cow表
create table h2 using hudi
options (type = 'cow', primaryKey = 'id')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as dt;

-- 从其他表加载数据
# create managed parquet table
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';

# CTAS by loading data into hudi table
create table hudi_tbl using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (datestr) as select * from parquet_mngd;

(8) Hudi配置

建表时可以设置表范围的选项,后期可通过set命令更改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
-- 模板
create table if not exists h3(
id bigint,
name string,
price double
) using hudi
options (
primaryKey = 'id',
type = 'mor',
${hoodie.config.key1} = '${hoodie.config.value2}',
${hoodie.config.key2} = '${hoodie.config.value2}',
....
);

-- 示例
create table if not exists h3(
id bigint,
name string,
price double
) using hudi
options (
primaryKey = 'id',
type = 'mor',
hoodie.cleaner.fileversions.retained = '20',
hoodie.keep.max.commits = '20'
);

2 变更表

(1) 语法

1
2
3
4
5
6
7
8
-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName

-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)

-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType

(2) 示例

1
2
3
4
5
alter table h0 rename to h0_1;

alter table h0_1 add columns(ext0 string);

alter table h0_1 change column id id bigint;

(3) Hudi配置

可以通过set serdeproperties设置写入配置。

1
alter table h3 set serdeproperties (hoodie.keep.max.commits = '10')

(4) Set命令

设置会话内配置。

1
2
3
set hoodie.insert.shuffle.parallelism = 100;
set hoodie.upsert.shuffle.parallelism = 100;
set hoodie.delete.shuffle.parallelism = 100;

(1) 建表

详见Read the Flink Quick Start

1
2
3
4
5
6
7
8
9
10
CREATE TABLE hudi_table2(
id int,
name string,
price double
)
WITH (
'connector' = 'hudi',
'path' = 's3://bucket-name/hudi/',
'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

(2) 变更表

1
alter table h0 rename to h0_1;

参考资料